# -*- coding:utf-8 -*- 
# @Time : 2021/6/10 11:36
# @Author: sl
# @File : kafka_consumer.py
import json
import logging

from kafka import KafkaConsumer
from framework import app
import threading


class KafkaConsumerStart(threading.Thread):
    def run(self):
        consumer = KafkaConsumer(app.config["KAFKA_TOPIC"], group_id=app.config["KAFKA_GROUP_ID"],max_poll_records=1, bootstrap_servers=app.config["KAFKA_BOOTSTRAP_SERVERS"],value_deserializer=lambda m: json.loads(m.decode('utf-8')))

        for msg in consumer:  # 业务处理
            logging.info("消费数据 {}  ".format(msg.value))
            consumer.commit_async()
